Stream App
What is a Stream App?
A stream app is an app that runs continuously on the server for the duration of the active data stream. Stream apps can be time or depth based.
The app receives real time data in the form of events that includes records in the events. It receives those events whenever the data provider provides that data to Corva.
They are typically continuous one second or one foot data, but can have latency.
The stream app receives corva#wits data for drilling time based, corva#drilling.wits.depth data for drilling depth based, corva#completion.wits data for completions, and corva#wireline.wits data for wireline.
The stream app can declare records from the stream event as well as individual indexes and key values like asset_id, company_id, stage number (completions), measured depth (drilling depth), log identifier (drilling depth) or other indexes and key values included in the event record.
When to build a Stream App?
Stream apps are most frequently used for real-time operation applications. Typical use cases for a stream app are when your business logic requires real-time data at regular intervals equal to 1 second or fractions of a foot. Most usages are for visualizations that require a detailed, granular look at the incoming data or that might impact real-time operational decision making.
I require real-time data
I require regularly sampled data with interval = 1s or fractions of 1ft
I require the app to be provisioned to the asset stream
I do not require the app to be invoked from a front-end app
I do not require the app to be triggered
If you do not require real-time data at one second or 1 foot intervals (or the next available measurement), you should use a Scheduled App.
Where to find the data to build a Stream App?
The Corva Dev Center Dataset Explorer is an easy way to to see a list of Corva datasets and the data stored in each dataset. An additional option is to utilize Corva's API tools located in the Dev Center Intro page and make a GET API request to your desired dataset.
Stream apps only run on active streams, for example wits, drilling.wits.depth, completion.wits or wireline.wits. To determine what data is available, you will need to review the current wits data available for either drilling time based: corva#wits, drilling depth based: corva#drilling.wits.depth, completions: corva#completion.wits or wireline: corva#wireline.wits. Data channels that Corva receives may vary from well to well dependent on the data provider. See examples each type of stream data below.
[
{
"_id": "6144da0ef249cd412db8f4f4",
"version": 1,
"provider": "corva",
"collection": "wits",
"timestamp": 1436941080,
"asset_id": 123456,
"company_id": 1,
"app": "corva.app",
"data_raw": {},
"metadata": {
"drillstring": "6144b8919c07fd21d8cb184b",
"casing": null,
"mud": "6144b9f836cfd72a42a97542",
"cuttings": null,
"surface-equipment": "6144b9e46764c92dff879d33",
"actual_survey": "6144ba975140c7244f842905",
"plan_survey": null
},
"data": {
"entry_at": 1436941080,
"mud g/l alarm state": 0,
"total mud volume": 0,
"trip tank mud volume": 0,
"line wear": 9313,
"totalpumpdisplacement": 0,
"on bottom hours": 24.1,
"circulating hours": 35.5,
"tool face": 1234,
"inclination": 2.9,
"azimuth": 154.06,
"h2s": 1,
"nitrogen pressure in": 1,
"nitrogen volume in": 0.9,
"hydrocarbon flow": 0,
"mwd temperature": 0,
"mwd vibration count": 0,
"mwd general variable 1": 0,
"condensate out": 0.84,
"mwd general variable 3": 0,
"mwd general variable 4": 0,
"mwd vibration xy gamma": 0,
"mwd vibration z gamma": 0,
"over pull": 0,
"fill strokes": 0,
"total fill strokes": 0,
"mwd general variable 7": 0,
"mwd general variable 8": 0,
"mwd general variable 9": 0,
"mwd general variable 10": 0,
"min wob": 0,
"surface stick slip index": 0,
"mwd dynamic inc": 0,
"mwd dynamic azi": 0,
"mwd vibration xyz gamma": 0,
"cement fluid temp": 0,
"mwd vibration xyz": 0,
"autodriller status 2 uw": 0,
"nitrogen volume out": 0.5,
"autodriller status 2 lw": 0,
"trip tank 1 low threshold": 0,
"trip tank 1 high threshold": 0,
"total gas return": 1,
"trip tank 2 low threshold": 0,
"trip tank 2 high threshold": 0,
"sensor depth": 0,
"bit rpm": 0,
"pvt total mud gain/loss": 0,
"mwd general variable 2": 0,
"wits lag depth": 0,
"pason lag depth": 6923.4,
"autodriller drum ticks": 0,
"flow data air (cmf)": 0,
"flow data pressure (psig)": 0,
"flow data temp (f)": 89,
"tts weight on bit": 0,
"drilling activity": 0,
"min tts weight on bit": 0,
"trip speed": 0,
"pump 4 total strokes": 0,
"autodriller status": 0,
"autodriller sensitivity": 0,
"convertible torque": 0,
"min convertible torque": 0,
"mwd general variable 5": 0,
"mwd general variable 6": 0,
"porosity 2 depth": 432,
"formation density depth": 0.51,
"min pressure": 0,
"min hook load": 0,
"min torque": 0,
"min rpm": 0,
"mwd general variable 0": 0,
"relative mse": 0,
"motor rpm": 0,
"porosity 1 depth": 0.87,
"autodriller wob": 0,
"autodriller brake pos": 0,
"autodriller diff press": 0,
"autodriller status uw": 0,
"autodriller status lw": 0,
"autodriller off bottom rop limit": 0,
"autodriller ticks per depth": 0,
"pvt mud tanks included": 0,
"pvt monitor mud gain/loss": 0,
"pvt mud g/l threshold": 0,
"trip tank 1 refill status": 0,
"flow 1 g/l threshold": 0,
"xy accel_ severity level": 0,
"z accel_ severity level": 0,
"trip tank fill": 0,
"trip tank accum": 0,
"rate of penetration": 0,
"time of penetration": 0,
"memos": "WELL CAPPED",
"bit_depth": 6921.1,
"block_height": 21.6,
"hole_depth": 6923.5,
"diff_press": 0,
"hook_load": 0,
"rop": 0,
"rotary_rpm": 0,
"pump_spm_1": 0,
"pump_spm_2": 0,
"standpipe_pressure": 0,
"rotary_torque": 0,
"pump_spm_total": 0,
"mud_flow_in": 0,
"strks_total": 0,
"strks_pump_3": 0,
"weight_on_bit": 0,
"strks_pump_1": 0,
"strks_pump_2": 0,
"ad_rop_setpoint": 0,
"ad_wob_setpoint": 0,
"gravity_tool_face": 26,
"magnetic_tool_face": 234,
"ad_diff_press_setpoint": 0,
"gamma_ray": 0,
"true_vertical_depth": 6890.78,
"state": "In Slips"
}
}
]
[
{
"_id":"638e08ad591fc9530bc95021",
"version":1,
"provider":"corva",
"collection":"drilling.wits.depth",
"asset_id":123456,
"data":{
"dep":5.23,
"ctda":0,
"ctdi":0.00000572499837536711,
"hdtv":5.22999999999999,
"remarks":null,
"tvd":5.2
},
"measured_depth":5.2,
"timestamp_read":1670252838,
"company_id":1,
"app":"corva.witsml-depth-source",
"log_identifier":"222f83a8ad8b"
},
{
"_id":"638e1914591fc9530bc97ec0",
"version":1,
"provider":"corva",
"collection":"drilling.wits.depth",
"measured_depth":4780,
"timestamp_read":1670256913,
"asset_id":123456,
"company_id":1,
"app":"corva.witsml-depth-source",
"log_identifier":"16a57de71bb2",
"data":{
"dep": 4780,
"hob": 2610.4,
"bitf": 2125,
"ctda": 294.647081162492,
"ctdi": 9.49662567605937,
"hdtv": 4740.39582281383,
"ropa": 29.4941738247871,
"dmiavg": 12.3505077891987,
"dmoavg": 0,
"fliavg": 1300.42742359428,
"floavg": 910.745726579475,
"ghcavg": 0.083685715571959,
"ghcmax": 0.108048261328046,
"hkldav": 128.081843750071,
"lagdep": 4764.64990234375,
"sppavg": 3456.2018385,
"tmiavg": 117.404556274414,
"tmoavg": 117.404556274414,
"tqabav": 6.77594677734375,
"wobavg": 10.4961884765684,
"rpmsavg": 168.743540454799,
"remarks": null,
"gcc3av": 3.77634501457214,
"gcc5nea": 0,
"gcc4na": 0,
"gcc5na": 0,
"gcc4ia": 0,
"gcc2av": 11.1370010375977,
"gcc1av": 1043.97717285156,
"gcc5ia": 0,
"tvd": 4740.45
}
}
]
[
{
"_id": "614bb07772ac9e2516ed92d2",
"version": 1,
"provider": "corva",
"collection": "completion.wits",
"timestamp": 1447635400,
"asset_id": 123456,
"company_id": 1,
"stage_number": 28,
"data": {
"timestamp": 1447635400,
"is_valid": true,
"wellhead_pressure": 0,
"slurry_flow_rate_in": 0,
"elapsed_time": 8260,
"clean_flow_rate_in": 0,
"total_clean_volume_in": 7467.806170953469,
"total_slurry_volume_in": 7873.681666666418,
"total_chemical_rate_in": 0,
"total_friction_reducer": 0,
"total_proppant_concentration": 0,
"total_proppant_mass": 376994.62118233106,
"bottomhole_proppant_concentration": 0,
"hydrostatic_pressure": 3243.6517636200106,
"inverse_hydrostatic_pressure": 0.0003082945004194811,
"backside_pressure": 4,
"enzyme_breaker": 0,
"scale_inhibitor": 0,
"surfactant": 0,
"friction_reducer": 0,
"friction_reducer_extra": 0,
"cross_linker": 0,
"acid": 0,
"gel": 0,
"ph_adjusting_agent": 0,
"accelerator": 0,
"fluid_loss": 0,
"ploymer_plug": 0,
"acid_inhibitor": 0,
"acid_retarder": 0,
"emulsifier": 0,
"clay_stabilizer": 0,
"non_emulsifier": 0,
"fines_suspender": 0,
"anti_sludge": 0,
"iron_control": 0,
"oxygen_scavenger": 0,
"mutual_solvent": 0,
"corrosion_inhibitor": 0,
"paraffin_control": 0,
"biocide": 0,
"instant_crosslinker": 0,
"delayed_crosslinker": 0,
"liquid_breaker": 0,
"powder_breaker": 0,
"divertor": 0,
"powder_gel": 0,
"powder_friction_reducer": 0,
"powder_enzyme_breaker": 0,
"proppant_1_concentration": 0,
"proppant_2_concentration": 0,
"tr press deriv": 0.54,
"slur rate2": 0,
"prop con2": 0,
"bh prop con": 0,
"tmt 100 mesh sand": 137414,
"tmt 40/70 white": 237635,
"j623 conc": 0,
"j624 conc": 1.6,
"pod cr dry conc raw": 374.1,
"an press 1h": 4,
"end_of_stage": 1,
"proppant_1_mass": 375049,
"pumpside_pressure": -14,
"extra_clean_fluid": 0
}
}
]
[
{
"_id": "5f161be7de7b936052e8bd5e",
"timestamp": 1574279322,
"version": 1,
"provider": "corva",
"collection": "wireline.wits",
"asset_id": 123456,
"company_id": 1,
"stage_number": 2,
"app": "corva.app",
"data": {
"c_006": -0.5522,
"c_007": -999.25,
"current": 0,
"voltage": 0,
"line_speed": 0.05,
"elapsed_time": 1119.9359,
"line_tension": 320.5558,
"measured_depth": 274.7967,
"casing_collar_locator": -0.08,
"state": "Unclassified",
"wellbore_orientation": "Vertical"
}
}
]
How to build a Stream App?
The following examples will demonstrate the simple building blocks of a stream application while utilizing all imported functionalities.
1. Stream Time Event
The use case for the example below is the app needs to retrieve 1 second data for hookload and weight on bit from corva#wits StreamTimeEvent. The app needs to add the hookload and weight on bit values, then POST the calculated values to a custom dataset named example-stream-time-app.
1.1 Follow the Getting Started Tutorial to install the prerequisites, create and copy app to your local machine.
1.2. Open and review the README.md file.
# Dev Center Python Real-Time Stream Data App
## Getting started
[Python SDK Documentation](https://corva-ai.github.io/python-sdk)
### 1. Install
You need to have `make` installed:
- Windows: [Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install)
- OS X: [Command Line Tools for Xcode](https://developer.apple.com/download/more/)
- Linux: refer to your distro-specific docs
To install dependencies run:
`make`
### 2. Run Tests
`make test`
### 3. Create app zip
`make bundle`
Example of a README.md file
1.3. Update to latest Python SDK version in requirements.txt file
corva-sdk==1.8.0
pytest==7.1.1
Example of a README.md file for a StreamTimeEvent app
1.3.1. In the command line run
pip install -U corva-sdk
1.3.2 Or within requirements.txt file set to latest sdk version e.g. corva-sdk==1.8.0
, then run
pip install -r requirements.txt
1.4. Optional: Install make dependency
1.4.1 Install make
make install
make all
1.5 Make adjustments to the manifest.json file
{
"format": 1,
"license": {
"type": "MIT",
"url": "https://www.oandgexample.com/license/"
},
"developer": {
"name": "O&G Company",
"identifier": "oandgc",
"authors": []
},
"application": {
"type": "stream",
"key": "big-data-energy.example_stream_time_app",
"visibility": "private",
"name": "Example Stream App",
"description": "This is the description of my app. You can do great things with it!",
"summary": "More information about this app goes here",
"category": "analytics",
"website": "https://www.oandgexample.com/my-app/",
"segments": [
"drilling"
]
},
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {},
"runtime": "python3.8",
"app": {}
},
"datasets": {
"big-data-energy.example-stream-time-app": {
"permissions": [
"read",
"write"
]
}
}
}
Example of a manifest.json file for a StreamTimeEvent app
1.5.1 Set read and/or write permissions to datasets in the manifest.json file
Read and write permissions can only be granted to company datasets (non-Corva), e.g. big-data-energy#example-stream-time-app. Read only permissions can be granted to Corva datasets, e.g. corva#wits. The example below shows how to grant read and write permissions to big-data-energy#example-stream-time-app.
"datasets": {
"big-data-energy.example-stream-time-app": {
"permissions": [
"read",
"write"
]
}
}
Example of dataset read/write permissions in the manifest.json file for a StreamTimeEvent app
1.5.2 Optional: Set Python Log Levels in environment variables in the manifest.json file
The default Log Level in the Corva SDK is Logger.info(). In order to change the default Log Level, you must set the log level in the environment variables within the manifest.json file. The example below shows how to change the default Log Level to Logger.debug().
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {}
},
Example of Python log levels environment variable setting in the manifest.json file for a StreamTimeEvent app
Note: Please refer to Python's Logging Documentation to learn about the different log levels here Logging HOWTO and the Corva Logging Documentation.
1.5.3 Optional: Increase the application timeout time in the manifest.json file
If your app is running longer than the desired timeout value, then you may need to make sure that the Lambda function is idempotent. If increasing the app timeout value is required, then you may need to increase the timeout value. The default timeout value for an application is 120 seconds. The maximum value for the application timeout is 900 seconds. The example below shows the application timeout increased to 240 seconds.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 240,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {}
},
Example of app timeout setting in the manifest.json file for a StreamTimeEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
1.5.4 Optional: Increase the application memory in the manifest.json file
If your app is importing large code libraries, completing memory intensive tasks, or is running much slower than expected, then you may need to increase the memory setting. The default memory value and minimum value for an application is 128 MB. The maximum value for the application memory is 10,240 MB. The example below shows the application timeout increased by 128 MB increments to 640 MB.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 640,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {}
},
Example of app memory setting in the manifest.json file for a StreamTimeEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
1.6 Implement logic in the lambda_function.py file
Now that the app is configured, you can now implement the logic in the lambda_function.py file.
Note: Implementing the logic in the lambda_function.py file is the most basic way to implement the logic. The user has the option to create directories and use Python libraries like Pydantic.
# 1. Import required functionality.
from corva import Api, Cache, Logger, StreamTimeEvent, stream
# 2. - Decorate your function using @stream. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@stream
def lambda_handler(event: StreamTimeEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: StreamTimeEvent and start using Api, Cache and Logger functionalities. You can obtain key values directly from metadata in the stream app event without making any additional API requests.
# You have access to asset_id, company_id, and real-time data records from event.
asset_id = event.asset_id
company_id = event.company_id
records = event.records
# Records is a list
record_count = len(records)
# Each element of records has a timestamp. You can declare variables for start and end timestamps.
start_timestamp = records[0].timestamp
end_timestamp = records[-1].timestamp
# Utilize the Logger functionality. The default log level is Logger.info. To use a different log level, the log level must be specified in the manifest.json file in the "settings.environment": {"LOG_LEVEL": "DEBUG"}. See the Logger documentation for more information.
Logger.info(f"{asset_id=} {company_id=}")
Logger.info(f"{start_timestamp=} {end_timestamp=} {record_count=}")
# Utililize the Cache functionality to get a set key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information.
# Getting last exported timestamp from Cache
last_exported_timestamp = int(cache.get(key='last_exported_timestamp') or 0)
# 4. Here is where you can add your app logic.
# Setting state to append data to an arrray
outputs = []
for record in records:
# Making sure we are not processing duplicate data
if record.timestamp <= last_exported_timestamp:
continue
# Each element of records has data. This is how to get specific key values from an embedded object
weight_on_bit = record.data.get("weight_on_bit", 0)
hook_load = record.data.get("hook_load", 0)
# This is how to set up a body of a POST request to store the hook_load and weight_on_bit data from the StreamTimeEvent and newly calculated wob_plus_hkld value
output = {
"timestamp": record.timestamp,
"asset_id": asset_id,
"company_id": company_id,
"provider": "big-data-energy",
"collection": "example-stream-time-app",
"data": {
"hook_load": hook_load,
"weight_on_bit": weight_on_bit,
"wob_plus_hkld": weight_on_bit + hook_load
},
"version": 1
}
# Appending the new data to the empty array
outputs.append(output)
# 5. Save the newly calculated data in a custom dataset
# Set up an if statement so that if request fails, lambda will be reinvoked. So no exception handling
if outputs:
# Utilize Logger functionality to confirm data in log files
Logger.debug(f"{outputs=}")
# Utilize the Api functionality. The data=outputs needs to be an an array because Corva's data is saved as an array of objects. Objects being records. See the Api documentation for more information.
api.post(
f"api/v1/data/big-data-energy/example-stream-time-app/", data=outputs,
).raise_for_status()
# Utililize the Cache functionality to set a key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information. This example is setting the last timestamp of the output to Cache
cache.set(key='last_exported_timestamp', value=outputs[-1].get("timestamp"))
return outputs
1.7 Locally test your application
1.7.1 Running a local test for your Corva App
To locally test your Corva app, you need to follow these steps:
Create a
local_run.py
file in your project directory.- This file will contain the code that simulates the environment in which your app will run.
- It uses environment variables for authentication and API access.
Set environment variables on your local machine.
Ensure you have set the appropriate environment variables, as shown in the code below.
Here's an example of how to export environment variables in your terminal:
export API_ROOT_URL="https://api.example.com"
export DATA_API_ROOT_URL="https://data-api.example.com"
export CORVA_API_KEY="your_api_key"
export APP_KEY="your_app_key"You can add these export statements to your shell profile (e.g.,
.bashrc
,.zshrc
) for persistence, or run them directly in your terminal for the current session.
Run the
local_run.py
file.Once you've created the file and set your environment variables, run the script using Python to simulate the app behavior in a local environment.
Example:
python local_run.py
Interpret the output.
- The output of your app will be printed in the terminal. You can use this output to verify the results of your function's execution in a local testing scenario.
Here’s an example of what the local_run.py
file should look like:
from corva import Api, Logger, StreamTimeRecord, StreamTimeEvent, Cache
from lambda_function import lambda_handler
if __name__ == '__main__':
import os
# Define test asset and company IDs
asset_id = 68056371
company_id = 80
# Creating a StreamTimeRecord with dummy data for hook load, weight on bit, and wob_plus_hkld
record = StreamTimeRecord(
timestamp=1722262415, # Example timestamp
data={
'_id': '66a8a7b825441d0010d6fc8f',
'version': 1, # Version number
'provider': 'big-data-energy', # Provider name
'collection': 'example-stream-time-app', # Collection name
'timestamp': 1705276800, # Example data timestamp
'timestamp_read': 1722328973, # Timestamp when data was read
'asset_id': 68056371, # Asset ID
'company_id': 80, # Company ID
'app': 'corva.witsml-source',
'data': {
'hook_load': 20000, # Example hook load in pounds
'weight_on_bit': 15000, # Example weight on bit in pounds
'wob_plus_hkld': 35000 # Calculated value (weight_on_bit + hook_load)
},
},
)
# Creating a StreamTimeEvent containing the record
_event = StreamTimeEvent(asset_id=asset_id, company_id=company_id, records=[record])
# Define a custom BearerApi class to handle API requests with authentication headers
class BearerApi(Api):
@property
def default_headers(self):
return {
'Authorization': f'Bearer {self.api_key}', # Bearer token for authentication
'X-Corva-App': self.app_key, # Corva-specific app key
}
# Instantiate the BearerApi class using environment variables for API and app keys
api = BearerApi(
api_url=os.environ['API_ROOT_URL'], # Base URL for the Corva API
data_api_url=os.environ['DATA_API_ROOT_URL'], # Base URL for the data API
api_key=os.environ['CORVA_API_KEY'], # API key from environment
app_key=os.environ['APP_KEY'], # App key from environment
)
# Initialize the cache
cache = Cache(hash_name='washout', redis_dsn='redis://127.0.0.1:6379/0')
# Execute the lambda_handler function with the event, API instance, and cache
outputs = lambda_handler(event=_event, api=api, cache=cache)
# Print the outputs
print(outputs)
1.8 Deploy your application
Please see Getting Started section 4. Upload and Publish.
1.8.1 App Runner production testing
Please see App Runner section for more information.
1.9 Provision the application
For Company provisioning please see App Provisioning for more information.
For Corva Partners please see App Provisioning with a focus on App Purchases.
2. Depth Event
The use case for the example below is the app needs to GET real time depth data for the hookload average (hkldav) and the weight on bit average (wobavg) field from corva#drilling.wits.depth event. The app needs to calculate the sum of hkldav and wobavg, then POST the hkldav plus wobavg value to a custom dataset named big-data-energy#example-stream-depth-app.
NOTE: To view the depth stream names and the depth stream's log identifier please make the following query:
https://api.corva.ai/v1/app_streams?company_id={companyId}&asset_id={assetId}&status=active&log_type=depth
App stream API request response example:
[
{
"id":44918,
"company_id":80,
"asset_id":72643645,
"name":"App Stream Example Well #1",
"configuration":{
},
"status":"active",
"visibility":"visible",
"segment":"drilling",
"source_type":"drilling",
"log_type":"depth",
"log_identifier":"16a57de71bb2",
"log_display_name":"Downhole_Depth",
"settings":{
"logIds":[
"6ebb880e-b2bc-3b3a-a11f-56b2deb0dbcf"
]
}
},
{
"id":44917,
"company_id":80,
"asset_id":72643645,
"name":"App Stream Example Well #1",
"configuration":{
},
"status":"active",
"visibility":"visible",
"segment":"drilling",
"source_type":"drilling",
"log_type":"depth",
"log_identifier":"222f83a8ad8b",
"log_display_name":"Surface_Depth",
"settings":{
"logIds":[
"8bc9832d-be1c-389b-aac2-f6e636b97ba3"
]
}
}
]
2.1 Follow the Getting Started Tutorial to install the prerequisites, create and copy app to your local machine.
2.2. Open and review the README.md file.
# Dev Center Python Stream Depth App
## Getting started
[Python SDK Documentation](https://corva-ai.github.io/python-sdk)
### 1. Install
You need to have `make` installed:
- Windows: [Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install)
- OS X: [Command Line Tools for Xcode](https://developer.apple.com/download/more/)
- Linux: refer to your distro-specific docs
To install dependencies run:
`make`
### 2. Run Tests
`make test`
### 3. Create app zip
`make bundle`
Example of a README.md file
2.3. Update to latest Python SDK version in requirements.txt file
corva-sdk==1.8.0
pytest==7.1.1
Example of a README.md file for a ScheduledDepthEvent app
2.3.1. In the command line run
pip install -U corva-sdk
2.3.2 Or within requirements.txt file set to latest sdk version e.g. corva-sdk==1.8.0
, then run
pip install -r requirements.txt
2.4. Optional: Install make dependency
2.4.1 Install make
make install
make all
2.5 Make adjustments to the manifest.json file
{
"format": 1,
"license": {
"type": "MIT",
"url": "https://www.oandgexample.com/license/"
},
"developer": {
"name": "O&G Company",
"identifier": "oandgc",
"authors": []
},
"application": {
"type": "stream",
"key": "big-data-energy.example_stream_depth_app",
"visibility": "private",
"name": "Example Stream Depth App",
"description": "This is the description of my app. You can do great things with it!",
"summary": "More information about this app goes here",
"category": "analytics",
"website": "https://www.oandgexample.com/my-app/",
"segments": [
"drilling"
]
},
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {},
"runtime": "python3.8",
"app": {}
},
"datasets": {
"big-data-energy.example-stream-depth-app": {
"permissions": [
"read",
"write"
]
}
}
}
Example of a manifest.json file for a StreamDepthEvent app
2.5.1 Set read and/or write permissions to datasets in the manifest.json file
Read and write permissions can only be given to company datasets (non-Corva), e.g. big-data-energy#example-stream-depth-app. Read permissions can be granted to Corva datasets, e.g. corva#drilling.wits.depth. The example below shows how to grant read and write permissions to big-data-energy#example-stream-depth-app.
"datasets": {
"big-data-energy.example-stream-depth-app": {
"permissions": [
"read",
"write"
]
}
}
Example of dataset read/write permissions in the manifest.json file for a StreamDepthEvent app
2.5.2 Optional: Set Python Log Levels in environment variables in the manifest.json file
The default Log Level in the Corva SDK is Logger.info(). In order to change the default Log Level, you must set the log level in the environment variables within the manifest.json file. The example below shows how to change the default Log Level to Logger.debug().
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {}
}
Example of Python log levels environment variable setting in the manifest.json file for a StreamDepthEvent app
Note: Please refer to Python's Logging Documentation to learn about the different log levels here Logging HOWTO and the Corva Logging Documentation.
2.5.3 Optional: Increase the application timeout time in the manifest.json file
If your app is running longer than the desired timeout value, then you may need to make sure that the Lambda function is idempotent. If increasing the app timeout value is required, then you may need to increase the timeout value. The default timeout value for an application is 120 seconds. The maximum value for the application timeout is 900 seconds. The example below shows the application timeout increased to 240 seconds.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 240,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {}
},
Example of app timeout setting in the manifest.json file for a StreamDepthEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
2.5.4 Optional: Increase the application memory in the manifest.json file
If your app is importing large code libraries, completing memory intensive tasks, or is running much slower than expected, then you may need to increase the memory setting. The default memory value and minimum value for an application is 128 MB. The maximum value for the application memory is 10,240 MB. The example below shows the application timeout increased by 128 MB increments to 640 MB.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 640,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {}
},
Example of app memory setting in the manifest.json file for a StreamDepthEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
2.6 Implement logic in the lambda_function.py file
Now that the app is configured, you can now implement the logic in the lambda_function.py file.
Note: Implementing the logic in the lambda_function.py file is the most basic way to implement the logic. The user has the option to create directories and use Python libraries like Pydantic.
# 1. Import required functionality.
from corva import Api, Cache, Logger, StreamDepthEvent, stream
# 2. - Decorate your function using @stream. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@stream
def lambda_handler(event: StreamDepthEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: StreamDepthEvent and start using Api, Cache and Logger functionalities. You can obtain key values directly from metadata in the stream app event without making any additional API requests.
# The stream depth app can declare the following attributes from the StreamDepthEvent: company_id: The company identifier; asset_id: The asset identifier; records: The records that include the indexes and data object with key values.
asset_id = event.asset_id
company_id = event.company_id
records = event.records
# Records is a list
record_count = len(records)
# Each element of records has a measured_depth. You can declare variables for start and end measured depth.
start_measured_depth = records[0].measured_depth
end_measured_depth = records[-1].measured_depth
# Utilize the Logger functionality. The default log level is Logger.info. To use a different log level, the log level must be specified in the manifest.json file in the "settings.environment": {"LOG_LEVEL": "DEBUG"}. See the Logger documentation for more information.
Logger.info(f"{asset_id=} {company_id=}")
Logger.info(f"{start_measured_depth=} {end_measured_depth=} {record_count=}")
# Utililize the Cache functionality to get a set key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information.
# Getting last exported measured_depth from Cache
last_exported_measured_depth = int(cache.get(key='last_exported_measured_depth') or 0)
# 4. Here is where you can add your app logic.
# Setting state to append data to an arrray
outputs = []
for record in records:
# Making sure we are not processing duplicate data
if record.measured_depth <= last_exported_measured_depth:
continue
# Each element of records has data. This is how to get specific key values from an embedded object
avg_weight_on_bit = record.data.get("wobavg", 0)
avg_hook_load = record.data.get("hkldav", 0)
# This is how to set up a body of a POST request to store the avg_hook_load and avg_weight_on_bit data from the StreamDepthEvent and newly calculated avg_wob_plus_hkld value
output = {
"measured_depth": record.measured_depth,
"asset_id": asset_id,
"company_id": company_id,
"provider": "big-data-energy",
"collection": "example-stream-depth-app",
"data": {
"avg_hook_load": avg_hook_load,
"avg_weight_on_bit": avg_weight_on_bit,
"avg_wob_plus_hkld": avg_weight_on_bit + avg_hook_load
},
"version": 1
}
# Appending the new data to the empty array
outputs.append(output)
# 5. Save the newly calculated data in a custom dataset
# Set up an if statement so that if request fails, lambda will be reinvoked. So no exception handling
if outputs:
# Utilize Logger functionality to confirm data in log files
Logger.debug(f"{outputs=}")
# Utilize the Api functionality. The data=outputs needs to be an an array because Corva's data is saved as an array of objects. Objects being records. See the Api documentation for more information.
api.post(
f"api/v1/data/big-data-energy/example-stream-depth-app/", data=outputs,
).raise_for_status()
# Utililize the Cache functionality to set a key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information. This example is setting the last measured_depth of the output to Cache
cache.set(key='last_exported_measured_depth', value=outputs[-1].get("measured_depth"))
return outputs
2.7 Locally test your application
2.7.1 Running a local test for your Corva App
To locally test your Corva app, you need to follow these steps:
Create a
local_run.py
file in your project directory.- This file will contain the code that simulates the environment in which your app will run.
- It uses environment variables for authentication and API access.
Set environment variables on your local machine.
Ensure you have set the appropriate environment variables, as shown in the code below.
Here's an example of how to export environment variables in your terminal:
export API_ROOT_URL="https://api.example.com"
export DATA_API_ROOT_URL="https://data-api.example.com"
export CORVA_API_KEY="your_api_key"
export APP_KEY="your_app_key"You can add these export statements to your shell profile (e.g.,
.bashrc
,.zshrc
) for persistence, or run them directly in your terminal for the current session.
Run the
local_run.py
file.Once you've created the file and set your environment variables, run the script using Python to simulate the app behavior in a local environment.
Example:
python local_run.py
Interpret the output.
- The output of your app will be printed in the terminal. You can use this output to verify the results of your function's execution in a local testing scenario.
Here’s an example of what the local_run.py
file should look like:
from corva import Api, Logger, StreamTimeRecord, StreamTimeEvent, Cache
from lambda_function import lambda_handler
if __name__ == '__main__':
import os
# Define test asset and company IDs
asset_id = 68056371
company_id = 80
# Creating a StreamTimeRecord with dummy data for hook load, weight on bit, and wob_plus_hkld
record = StreamTimeRecord(
timestamp=1722262415, # Example timestamp
data={
'_id': '66a8a7b825441d0010d6fc8f',
'version': 1, # Version number
'provider': 'big-data-energy', # Provider name
'collection': 'example-stream-time-app', # Collection name
'timestamp': 1705276800, # Example data timestamp
'timestamp_read': 1722328973, # Timestamp when data was read
'asset_id': 68056371, # Asset ID
'company_id': 80, # Company ID
'app': 'corva.witsml-source',
'data': {
'hook_load': 20000, # Example hook load in pounds
'weight_on_bit': 15000, # Example weight on bit in pounds
'wob_plus_hkld': 35000 # Calculated value (weight_on_bit + hook_load)
},
},
)
# Creating a StreamTimeEvent containing the record
_event = StreamTimeEvent(asset_id=asset_id, company_id=company_id, records=[record])
# Define a custom BearerApi class to handle API requests with authentication headers
class BearerApi(Api):
@property
def default_headers(self):
return {
'Authorization': f'Bearer {self.api_key}', # Bearer token for authentication
'X-Corva-App': self.app_key, # Corva-specific app key
}
# Instantiate the BearerApi class using environment variables for API and app keys
api = BearerApi(
api_url=os.environ['API_ROOT_URL'], # Base URL for the Corva API
data_api_url=os.environ['DATA_API_ROOT_URL'], # Base URL for the data API
api_key=os.environ['CORVA_API_KEY'], # API key from environment
app_key=os.environ['APP_KEY'], # App key from environment
)
# Initialize the cache
cache = Cache(hash_name='washout', redis_dsn='redis://127.0.0.1:6379/0')
# Execute the lambda_handler function with the event, API instance, and cache
outputs = lambda_handler(event=_event, api=api, cache=cache)
# Print the outputs
print(outputs)
2.8 Deploy your application
Please see Getting Started section 4. Upload and Publish.
2.8.1 App Runner production testing
Please see App Runner section for more information.
2.9 Provision the application
For Company provisioning please see App Provisioning for more information.
For Corva Partners please see App Provisioning with a focus on App Purchases.